iT邦幫忙

2024 iThome 鐵人賽

DAY 5
0
DevOps

我獨自升級:從水管工走向 DataOps系列 第 5

【Day 5】Airflow 連接 AWS S3 - S3Hook

  • 分享至 

  • xImage
  •  

前言

接續昨天的透過 IAM 取得 token 之後,今天就要來 create S3 Bucket,接著使用 Airflow 的 S3Hook 來存取 AWS S3 的檔案。

創建一個 S3 Bucket

下方基本上都是照著預設,有些部分可以就我所知說明一下:

  • ACLs: access control lists,是做更進階的權限存取時會使用到
  • Block Public Access:打勾就是不允許公開訪問,但如果還是想透過網址訪問物件,會需要帶Security-TokenSignature 才行
  • Bucket Versioning:就是版本控制,有 enable 開啟的話,就能在誤刪檔案時快速恢復到過去的版本救援。
  • 大部分都可以創建之後再修改,所以練習時只需要照著預設去做就好

https://ithelp.ithome.com.tw/upload/images/20240919/20135427rWCiYRuEoN.jpg

AWS S3 連接方法二:直接設定在環境變數

昨天的方法ㄧ是透過 Airflow Web UI 來設定 Connection

打開 airflow project,在 docker-compose.yaml 的 environment 當中加上 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY,下方為官方範例,要修改成昨天 csv 檔案當中的值。

x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.1}
  # build: .
  environment:
    ...
    AWS_ACCESS_KEY_ID: AKIAIOSFODNN7EXAMPLE
    AWS_SECRET_ACCESS_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  • 有在環境變數設定過後,conn_id 就可以直接用預設的 aws_default 來存取

S3Hook 範例 task 說明

S3Hook 將檔案從 s3 下載到 local

def s3_extract():
    source_s3_key = "s3_demo/s3_extract.txt"
    source_s3_bucket = "ithome-it30-demo"
    dest_file_path = "/opt/airflow/dags/"
    source_s3 = S3Hook(aws_conn_id="aws_default")
    source_s3.download_file(
		key=source_s3_key,
		bucket_name=source_s3_bucket,
		local_path=dest_file_path
	)
  • s3_extract() 函數的目的是從 S3 存儲桶(ithome-it30-demo)中的指定路徑(s3_demo/s3_extract.txt)下載文件到本地目錄裡(/opt/airflow/dags/
  • source_s3 = S3Hook(aws_conn_id="aws_default") 使用 Airflow 的 S3Hook 來與 AWS S3 進行互動,並使用已經在 Airflow UI 中配置的 AWS 連接(aws_default)。
  • source_s3.download_file() 調用了 S3Hook 的 download_file 方法,將檔案從 S3 下載到指定的本地路徑。

S3Hook 將檔案上傳到 s3

def s3_upload():
    source_s3_key = "s3_demo/s3_extract.txt"
    source_s3_bucket = "ithome-it30-demo"
    dest_file_path = "/opt/airflow/dags/s3_extract.txt"
    source_s3 = S3Hook(aws_conn_id="aws_default")
    source_s3.load_file(
        filename=dest_file_path,
        key=source_s3_key,
        bucket_name=source_s3_bucket
    )
  • 只需要將 download_file 改成 load_file 就能將檔案從 local 上傳至 AWS S3 了

完整程式碼

from datetime import datetime
from airflow.decorators import task, dag
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

@task
def s3_extract():
    source_s3_key = "s3_demo/s3_extract.txt"
    source_s3_bucket = "ithome-it30-demo"
    dest_file_path = "/opt/airflow/dags/"
    source_s3 = S3Hook(aws_conn_id="aws_default")
    source_s3.download_file(
		key=source_s3_key,
		bucket_name=source_s3_bucket,
		local_path=dest_file_path
	)

@dag(
    dag_id="s3_extract_taskflow",
    start_date=datetime(2024, 9, 18),
    schedule=None,
    catchup=False,
)
def s3_extract_dag():
    s3_extract()

s3_extract_dag()

上一篇
【Day 4】Airflow 連接 AWS S3 - 介紹與 IAM 設定
下一篇
【Day 6】Airflow 用 Slack 傳送通知 - Slack API 設定
系列文
我獨自升級:從水管工走向 DataOps21
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言